package com.cloudansys.core.flink.function;

import com.cloudansys.core.entity.MultiDataEntity;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.*;

@Slf4j
public class MaxAndMinProcessor extends ProcessWindowFunction<List<MultiDataEntity>, List<MultiDataEntity>, String, TimeWindow> {
    /**
     * 5 秒返回一个最大值和一个最小值
     */
    @Override
    public void process(String k, Context context, Iterable<List<MultiDataEntity>> elements, Collector<List<MultiDataEntity>> out) throws Exception {
//        log.info("======================elements: {}", elements);
        // eleMap 的 Key 为：projectId + serialCode Value 为：这 5 秒内第一个指标的集合
        Map<String, TreeSet<Double>> valMap = new HashMap<>();
        Map<String, MultiDataEntity> objMap = new HashMap<>();
        for (List<MultiDataEntity> element : elements) {
            for (MultiDataEntity multiDataEntity : element) {
                String projectId = multiDataEntity.getProjectId();
                String serialCode = multiDataEntity.getSerialCode();
                String key = projectId + serialCode;
                Double[] values = multiDataEntity.getValues();
                Double value = values[0];
                if (valMap.containsKey(key)) {
                    valMap.get(key).add(value);
                } else {
                    TreeSet<Double> valSet = new TreeSet<>();
                    valSet.add(value);
                    valMap.put(key, valSet);
                }
                if (!objMap.containsKey(key)) {
                    objMap.put(key, multiDataEntity);
                }
            }
        }
//        log.info("======================valMap: {}", valMap);
        for (String key : valMap.keySet()) {
            TreeSet<Double> valueSet = valMap.get(key);
            Double max = valueSet.pollLast();
            Double min = valueSet.pollFirst();
            // 把第二个指标设置成最大值和第三个指标为最小值
            objMap.get(key).getValues()[1] = max != null ? max : 0;
            objMap.get(key).getValues()[2] = min != null ? min : 0;
        }
        out.collect(new ArrayList<>(objMap.values()));
    }
}
